---
title: DASE Components Explained (Classification)
---

<%= partial 'shared/dase/dase',
locals: {
  template_name: 'Classification Engine Template',
  evaluation_link: '/evaluation/paramtuning/'
} %>

## The Engine Design

As you can see from the Quick Start, *MyClassification* takes a JSON prediction
query, e.g. `{ "attr0":4, "attr1":3, "attr2":8 }`, and return a JSON predicted result.

WARNING: for version < v0.3.1, it is array of features values: `{ "features": [4, 3, 8] }`

In MyClassification/src/main/scala/***Engine.scala***, the `Query` case class
defines the format of **query**, such as `{ "attr0":4, "attr1":3, "attr2":8 }`:

```scala
class Query(
  val attr0 : Double,
  val attr1 : Double,
  val attr2 : Double
) extends Serializable

```

The `PredictedResult` case class defines the format of **predicted result**,
such as `{"label":2.0}`:

```scala
class PredictedResult(
  val label: Double
) extends Serializable
```

Finally, `ClassificationEngine` is the Engine Factory that defines the
components this engine will use: Data Source, Data Preparator, Algorithm(s) and
Serving components.

```scala
object ClassificationEngine extends IEngineFactory {
  def apply() = {
    new Engine(
      classOf[DataSource],
      classOf[Preparator],
      Map("naive" -> classOf[NaiveBayesAlgorithm]),
      classOf[Serving])
  }
}
```

### Spark MLlib

Spark's MLlib NaiveBayes algorithm takes training data of RDD type, i.e.
`RDD[LabeledPoint]` and train a model, which is a `NaiveBayesModel` object.

PredictionIO's MLlib Classification engine template, which *MyClassification*
bases on, integrates this algorithm under the DASE architecture. We will take a
closer look at the DASE code below.
> [Check this out](https://spark.apache.org/docs/latest/mllib-naive-bayes.html)
to learn more about MLlib's NaiveBayes algorithm.

## Data

In the DASE architecture, data is prepared by 2 components sequentially: *Data
Source* and *Data Preparator*. *Data Source* and *Data Preparator* takes data
from the data store and prepares `RDD[LabeledPoint]` for the NaiveBayes
algorithm.

### Data Source

In MyClassification/src/main/scala/***DataSource.scala***, the `readTraining`
method of the class `DataSource` reads, and selects, data from datastore of
EventServer and it returns `TrainingData`.

```scala
case class DataSourceParams(appName: String) extends Params

class DataSource(val dsp: DataSourceParams)
  extends PDataSource[TrainingData, EmptyEvaluationInfo, Query, EmptyActualResult] {

  @transient lazy val logger = Logger[this.type]

  override
  def readTraining(sc: SparkContext): TrainingData = {

    val labeledPoints: RDD[LabeledPoint] = PEventStore.aggregateProperties(
      appName = dsp.appName,
      entityType = "user",
      // only keep entities with these required properties defined
      required = Some(List("plan", "attr0", "attr1", "attr2")))(sc)
      // aggregateProperties() returns RDD pair of
      // entity ID and its aggregated properties
      .map { case (entityId, properties) =>
        try {
          LabeledPoint(properties.get[Double]("plan"),
            Vectors.dense(Array(
              properties.get[Double]("attr0"),
              properties.get[Double]("attr1"),
              properties.get[Double]("attr2")
            ))
          )
        } catch {
          case e: Exception => {
            logger.error(s"Failed to get properties ${properties} of" +
              s" ${entityId}. Exception: ${e}.")
            throw e
          }
        }
      }.cache()

    new TrainingData(labeledPoints)
  }
}
```

`PEventStore` is an object which provides function to access data that is collected through the *Event Server*, and
`PEventStore.aggregateProperties` aggregates the event records of the 4 properties
(attr0, attr1, attr2 and plan) for each user.

PredictionIO automatically loads the parameters of *datasource* specified in
MyEngine/***engine.json***, including *appName*, to `dsp`.

In ***engine.json***:

```
{
  ...
  "datasource": {
    "params": {
      "appName": "MyApp1"
    }
  },
  ...
}
```

In this sample text data file, columns are delimited by comma (,). The first
column are labels. The second column are features.

The class definition of `TrainingData` is:

```scala
class TrainingData(
  val labeledPoints: RDD[LabeledPoint]
) extends Serializable
```
and PredictionIO passes the returned `TrainingData` object to *Data Preparator*.


### Data Preparator

In MyClassification/src/main/scala/***Preparator.scala***, the `prepare` of
class `Preparator` takes `TrainingData`. It then conducts any necessary feature
selection and data processing tasks. At the end, it returns `PreparedData` which
should contain the data *Algorithm* needs. For MLlib NaiveBayes, it is
`RDD[LabeledPoint]`.

By default, `prepare` simply copies the unprocessed `TrainingData` data to
`PreparedData`:

```scala
class PreparedData(
  val labeledPoints: RDD[LabeledPoint]
) extends Serializable

class Preparator
  extends PPreparator[TrainingData, PreparedData] {

  def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
    new PreparedData(trainingData.labeledPoints)
  }
}
```

PredictionIO passes the returned `PreparedData` object to Algorithm's `train`
function.

## Algorithm

In MyClassification/src/main/scala/***NaiveBayesAlgorithm.scala***, the two
methods of the algorithm class are `train` and `predict`. `train` is responsible
for training a predictive model. PredictionIO will store this model and
`predict` is responsible for using this model to make prediction.

### train(...)

`train` is called when you run **pio train**. This is where MLlib NaiveBayes
algorithm, i.e. `NaiveBayes.train`, is used to train a predictive model.

```scala
def train(sc: SparkContext, data: PreparedData): NaiveBayesModel = {
    NaiveBayes.train(data.labeledPoints, ap.lambda)
}
```

In addition to `RDD[LabeledPoint]` (i.e. `data.labeledPoints`),
`NaiveBayes.train` takes 1 parameter: *lambda*.

The values of this parameter is specified in *algorithms* of
MyClassification/***engine.json***:

```
{
  ...
  "algorithms": [
    {
      "name": "naive",
      "params": {
        "lambda": 1.0
      }
    }
  ]
  ...
}
```

PredictionIO will automatically loads these values into the constructor `ap`,
which has a corresponding case class `AlgorithmParams`:

```scala
case class AlgorithmParams(
  lambda: Double
) extends Params
```

`NaiveBayes.train` then returns a `NaiveBayesModel` model. PredictionIO will
automatically store the returned model.

### predict(...)

The `predict` method is called when you send a JSON query to
http://localhost:8000/queries.json. PredictionIO converts the query, such as `{ "attr0":4, "attr1":3, "attr2":8 }` to the `Query` class you defined previously.

The predictive model `NaiveBayesModel` of MLlib NaiveBayes offers a function
called `predict`. `predict` takes a dense vector of features. It predicts the
label of the item represented by this feature vector.

```scala
  def predict(model: NaiveBayesModel, query: Query): PredictedResult = {
    val label = model.predict(Vectors.dense(
    	query.attr0, query.attr1, query.attr2
    ))
    new PredictedResult(label)
  }
```

> You have defined the class `PredictedResult` earlier in this page.

PredictionIO passes the returned `PredictedResult` object to *Serving*.

## Serving

The `serve` method of class `Serving` processes predicted result. It is also
responsible for combining multiple predicted results into one if you have more
than one predictive model. *Serving* then returns the final predicted result.
PredictionIO will convert it to a JSON response automatically.

In MyClassification/src/main/scala/***Serving.scala***,

```scala
class Serving
  extends LServing[Query, PredictedResult] {

  override
  def serve(query: Query,
    predictedResults: Seq[PredictedResult]): PredictedResult = {
    predictedResults.head
  }
}
```

When you send a JSON query to http://localhost:8000/queries.json,
`PredictedResult` from all models will be passed to `serve` as a sequence, i.e.
`Seq[PredictedResult]`.

> An engine can train multiple models if you specify more than one Algorithm
component in `object RecommendationEngine` inside ***Engine.scala***. Since only
one `NaiveBayesAlgorithm` is implemented by default, this `Seq` contains one
element.

In this case, `serve` simply returns the predicted result of the first, and the
only, algorithm, i.e. `predictedResults.head`.

Congratulations! You have just learned how to customize and build a
production-ready engine. Have fun!
